package priv.lirenhe.dissertation.bdb;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.io.FileUtils;

import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.serial.SerialBinding;
import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.collections.StoredMap;
import com.sleepycat.collections.StoredSortedMap;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseExistsException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.EnvironmentConfig;
/** 
 * 持久化队列,基于BDB实现,也继承Queue,以及可以序列化.但不等同于Queue的时,不再使用后需要关闭 
 * 相比一般的内存Queue,插入和获取值需要多消耗一定的时间 
 * 这里为什么是继承AbstractQueue而不是实现Queue接口,是因为只要实现offer,peek,poll几个方法即可, 
 * 其他如remove,addAll,AbstractQueue会基于这几个方法去实现 
 *  
 * @contributor lirenhe 
 * @param <E> 
 */  
public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements  
        Serializable {  
    private static final long serialVersionUID = 3427799316155220967L;  
    private transient BdbEnvironment dbEnv;            // 数据库环境,无需序列化  
    private transient Database queueDb;             // 数据库,用于保存值,使得支持队列持久化,无需序列化  
    private transient StoredMap<Long,E> queueMap;   // 持久化Map,Key为指针位置,Value为值,无需序列化  
    private transient String dbDir;                 // 数据库所在目录  
    private transient String dbName;                // 数据库名字  
    private AtomicLong headIndex;                   // 头部指针  
    private AtomicLong tailIndex;                   // 尾部指针  
    private transient E peekItem=null;              // 当前获取的值  
      
    /** 
     * 构造函数,传入BDB数据库 
     *  
     * @param db 
     * @param valueClass 
     * @param classCatalog 
     */  
    public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){  
        this.queueDb=db;  
        this.dbName=db.getDatabaseName();  
        headIndex=new AtomicLong(0);  
        tailIndex=new AtomicLong(0);  
        bindDatabase(queueDb,valueClass,classCatalog);  
    }  
    /** 
     * 构造函数,传入BDB数据库位置和名字,自己创建数据库 
     *  
     * @param dbDir 
     * @param dbName 
     * @param valueClass 
     */  
    public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){  
        headIndex=new AtomicLong(0);  
        tailIndex=new AtomicLong(0);  
        this.dbDir=dbDir;  
        this.dbName=dbName;  
        createAndBindDatabase(dbDir,dbName,valueClass);  
    }  
    
    /** 
     * 创建以及绑定数据库 
     *  
     * @param dbDir 
     * @param dbName 
     * @param valueClass 
     * @throws DatabaseNotFoundException 
     * @throws DatabaseExistsException 
     * @throws DatabaseException 
     * @throws IllegalArgumentException 
     */  
    private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException,  
    DatabaseExistsException,DatabaseException,IllegalArgumentException{  
        File envFile = null;  
        EnvironmentConfig envConfig = null;  
        DatabaseConfig dbConfig = null;  
        Database db=null;  
  
        try {  
            // 数据库位置  
            envFile = new File(dbDir);  
              
            // 数据库环境配置  
            envConfig = new EnvironmentConfig();  
            envConfig.setAllowCreate(true);  
            envConfig.setTransactional(false);  
              
            // 数据库配置  
            dbConfig = new DatabaseConfig();  
            dbConfig.setAllowCreate(true);  
            dbConfig.setTransactional(false);  
            dbConfig.setDeferredWrite(true);  
              
            // 创建环境  
            dbEnv = new BdbEnvironment(envFile, envConfig);  
            // 打开数据库  
            db = dbEnv.openDatabase(null, dbName, dbConfig);  
            // 绑定数据库  
            bindDatabase(db,valueClass,dbEnv.getClassCatalog());  
               
        } catch (DatabaseNotFoundException e) {  
            throw e;  
        } catch (DatabaseExistsException e) {  
            throw e;  
        } catch (DatabaseException e) {  
            throw e;  
        } catch (IllegalArgumentException e) {  
            throw e;  
        }  
    }  
    
    /** 
     * 绑定数据库 
     *  
     * @param db 
     * @param valueClass 
     * @param classCatalog 
     */  
    public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){  
        EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);  //基本数据的元组绑定        primitive：基本数据类型
        if(valueBinding == null) {  
            valueBinding = new SerialBinding<E>(classCatalog, valueClass);   // 序列化绑定  ，如果valueClass不是基本数据类型，就是用序列化绑定
        }  
        queueDb = db;  
        queueMap = new StoredSortedMap<Long,E>(  
        		queueDb,                                             // db  
                TupleBinding.getPrimitiveBinding(Long.class),   //Key  
                valueBinding,                                   // Value  
                true);                                          // allow write  
        
        if(!queueMap.isEmpty()){
        	//StoredSortedMap是个可排序的map集合
        	if(queueMap.isOrdered()){
        		headIndex = new AtomicLong((Long)queueMap.keySet().toArray()[0]);
            	tailIndex = new AtomicLong((Long)queueMap.keySet().toArray()[queueMap.size()-1]);
        	}else{
            	try {
    				throw new Exception(" queueMap不是顺序的");
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
            }
        }
    }  
      
    /** 
     * 值遍历器 
     */  
    @Override  
    public Iterator<E> iterator() {  
        return queueMap.values().iterator();  
    }  
    
    /** 
     * 大小 
     */  
    @Override  
    public int size() {  
        synchronized(tailIndex){  
            synchronized(headIndex){
                return queueMap.size(); 
            }  
        }  
    }  
    
    /** 
     * 插入值 
     */  
    @Override  
    public boolean offer(E e) {  
        synchronized(tailIndex){
//        	System.out.println("offer之前tailIndex="+tailIndex+"保证初始化时tailIndex正确而不是每次都为0！");
            queueMap.put(tailIndex.getAndIncrement(), e);   // 从尾部插入  
        }  
        return true;  
    }  
    
    /** 
     * 插入值并刷新
     */  
    public boolean offerAndFlush(E e) {  
        synchronized(tailIndex){
            queueMap.put(tailIndex.getAndIncrement(), e);   // 从尾部插入 
            sync();
        }  
        return true;  
    }
    
    /** 
     * 循环获取不为null值,从头部获取,返回值为null的话将去掉，再往后移取出不为null的值
     */  
    @Override  
    public E peek() {  
        synchronized(headIndex){  
        	
//        	System.out.println("peek() headIndex="+headIndex);
//        	System.out.println("peek() tailIndex="+tailIndex);
        	
            if(peekItem != null){  
                return peekItem;  
            }  
            E headItem=null;  
            while(headItem == null && headIndex.get() <= tailIndex.get()){ // 没有超出范围  ,注意tailIndex决定了为什么<=
                headItem = queueMap.get(headIndex.get()); 
//                System.out.println("headItem="+headItem);
                if(headItem != null){
                	peekItem = headItem; 
                	break;
                }else{
                	headIndex.getAndIncrement();
                }
            }  
            
            return peekItem;  
        }  
    }  
    
    /** 
     * 移出元素,移出头部元素 ，头部指针向后移
     */  
    @Override  
    public E poll() {  
        synchronized(headIndex){  
        	
//        	System.out.println("poll() headIndex="+headIndex);
        	
            E headItem=peek();  //peek返回值一定不为null
            if(headItem != null){  
                queueMap.remove(headIndex.getAndIncrement());
                peekItem = null;  
                return headItem;  
            }  
        }  
        return null;  
    } 
    
    /** 
     * 移出元素,移出头部元素 并刷新
     */  
    public E pollAndFlush() {  
        synchronized(headIndex){  
            E headItem=peek();  
            if(headItem != null){  
                queueMap.remove(headIndex.getAndIncrement());
                sync();
                peekItem=null;  
                return headItem;  
            }  
        }  
        return null;  
    }
    
    /**
     * 获取数据库中所有value，注意数据格式：key-value，需要使用split("-")进行字符串分割
     * @return
     */
    public List<String> getAllFromQueue(){
    	List<String> list = new ArrayList<String>();
    	synchronized (headIndex) {
			synchronized (tailIndex) {
				if(!queueMap.isEmpty()){
					Iterator<?> iterator = null;
					Entry<Long,String> entry = null;
					iterator = queueMap.entrySet().iterator();
					while(iterator.hasNext()){
						entry = (Entry<Long, String>) iterator.next();
						list.add(entry.getKey()+"-"+entry.getValue());
					}
				}
			}
		}
    	return list;
    }
    
    /**
     * 判断是否包含该key
     * @param e
     * @return
     */
    public boolean isContainKey(E e){
    	boolean flag = false;
    	flag = queueMap.containsKey(e);
    	return flag;
    }
    
    /**
     * 判断是否包含该value
     * @param e
     * @return
     */
    public boolean isContainValue(E e){
    	boolean flag = false;
    	flag = queueMap.containsValue(e);
    	return flag;
    }
    
    /**
     * 判断是否为空
     */
    @Override
    public boolean isEmpty(){
    	boolean flag = false;
    	flag = queueMap.isEmpty();
    	return flag;
    }
    
    /** 
     * 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境 
     */  
    public void close(){  
        try {  
            if(queueDb != null){  
                queueDb.close();  
            }  
        } catch (DatabaseException e) {  
            e.printStackTrace();  
        } catch (UnsupportedOperationException e) {  
            e.printStackTrace();  
        }  
    }  
    
    /**
     * 刷新到磁盘中
     */
    public void sync(){
    	queueDb.sync();
    }
    
    /** 
     * 清理,会清空数据库,并且删掉数据库所在目录,慎用.如果想保留数据,请调用close() 
     */  
    @Override  
    public void clear() {  
       try {  
           close();  
           if(dbEnv != null && queueDb != null){  
                dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName);   
                dbEnv.close();  
           }  
        } catch (DatabaseNotFoundException e) {  
            e.printStackTrace();  
        } catch (DatabaseException e) {  
            e.printStackTrace();  
        } finally{  
            try {  
                if(this.dbDir != null){  
                    FileUtils.deleteDirectory(new File(this.dbDir));  
                }  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
      
}  
